In ths notebook, we're going to train a simple RNN to do time-series prediction. Given some set of input data, it should be able to generate a prediction for the next time step!

- First, we'll create our data
- Then, define an RNN in PyTorch
- Finally, we'll train our network and see how it performs
import torch
from torch import nn
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
%matplotlib inline
## This line of code ensures the reproducibility of the code
torch.manual_seed(42)
<torch._C.Generator at 0x1c5c3f4e750>
## testing out how we are going to generate the data points
degs = np.linspace(0, 180, 19)
rads = degs*np.pi/180
sin_results = np.sin(rads)
print(f'The angles in rad are:\n{rads}\n')
print(f'The angles in degrees are:\n{degs}\n')
print('\n' + '='*100 + '\n')
print(f'The Sine of the angles are:\n{sin_results}')
The angles in rad are: [0. 0.17453293 0.34906585 0.52359878 0.6981317 0.87266463 1.04719755 1.22173048 1.3962634 1.57079633 1.74532925 1.91986218 2.0943951 2.26892803 2.44346095 2.61799388 2.7925268 2.96705973 3.14159265] The angles in degrees are: [ 0. 10. 20. 30. 40. 50. 60. 70. 80. 90. 100. 110. 120. 130. 140. 150. 160. 170. 180.] ==================================================================================================== The Sine of the angles are: [0.00000000e+00 1.73648178e-01 3.42020143e-01 5.00000000e-01 6.42787610e-01 7.66044443e-01 8.66025404e-01 9.39692621e-01 9.84807753e-01 1.00000000e+00 9.84807753e-01 9.39692621e-01 8.66025404e-01 7.66044443e-01 6.42787610e-01 5.00000000e-01 3.42020143e-01 1.73648178e-01 1.22464680e-16]
The data generation process in the next cell is following these steps:
features will be the sine output of the number and the labels will be also the sine output but of the next number, so as if it's a one time step ahead in the future.# how many time steps/data pts are in one batch of data
seq_length = 20
# generate evenly spaced data pts
time_steps = np.linspace(0, 180, seq_length + 1) ## generated 21 points
data = np.sin(time_steps*np.pi/180) ## still 21 values
data.resize((seq_length + 1, 1)) # size becomes (seq_length+1, 1), adds an input_size dimension
## the above line is an inline operation, so resize changes the shape of the data variable
## another approach is:
## data = data.reshape(seq_length + 1, 1)
x = data[:-1] # all but the last piece of data
print('the shape of x is:', x.shape)
y = data[1:] # all but the first
print('the shape of y is:', y.shape)
# display the data
plt.figure(figsize=(8,5))
plt.plot(time_steps[1:], x, 'r.', label='input, x') # x
plt.plot(time_steps[1:], y, 'b.', label='target, y') # y
plt.legend(loc='best')
plt.show()
the shape of x is: (20, 1) the shape of y is: (20, 1)
## generating the same graph from the cell above but using Plotly
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=time_steps[1:], y=x.squeeze(), mode='markers',
name='input x', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=time_steps[1:], y=y.squeeze(), mode='markers',
name='target y', marker_color='blue')
)
fig.update_layout(autosize=False, width=650, height=500,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
Next, we define an RNN in PyTorch. We'll use nn.RNN to create an RNN layer, then we'll add a last, fully-connected layer to get the output size that we want. An RNN takes in a number of parameters:
Take a look at the RNN documentation to read more about recurrent layers.
## Define the RNN model
class RNN(nn.Module):
def __init__(self, input_size, output_size, hidden_dim, n_layers):
super().__init__()
self.hidden_dim=hidden_dim
# define an RNN with specified parameters
# batch_first means that the first dim of the input and output will be the batch_size
self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
# last, fully-connected layer
self.fc = nn.Linear(hidden_dim, output_size)
def forward(self, x, hidden):
# x is of shape: (batch_size, seq_length, input_size=num_features)
# hidden is of shape: (n_layers, batch_size, hidden_dim)
# r_out is of shape: (batch_size, time_step, hidden_size)
batch_size = x.shape[0]
# get RNN outputs
r_out, hidden = self.rnn(x, hidden)
# shape output to be (batch_size*seq_length, hidden_dim)
r_out = r_out.view(-1, self.hidden_dim)
# get final output
output = self.fc(r_out)
return output, hidden
## Define the same RNN model from above but this time it's a debugging version
## So, this version prints the shape of input, hidden and output tensors at each step
class RNN_debug(nn.Module):
def __init__(self, input_size, output_size, hidden_dim, n_layers):
super().__init__()
self.hidden_dim=hidden_dim
# define an RNN with specified parameters
# batch_first means that the first dim of the input and output will be the batch_size
self.rnn = nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
# last, fully-connected layer
self.fc = nn.Linear(hidden_dim, output_size)
def forward(self, x, hidden):
# x is of shape: (batch_size, seq_length, input_size=num_features)
# hidden is of shape: (n_layers, batch_size, hidden_dim)
# r_out is of shape: (batch_size, time_step, hidden_size)
batch_size = x.shape[0]
# get RNN outputs
r_out, hidden = self.rnn(x, hidden)
print(f'The shape of the RNN hidden state is: {hidden.shape}')
print('And the hidden state is:')
print(hidden)
print(f'\nThe shape of the RNN output before any format is: {r_out.shape}')
print('And the untouched output is:')
print(r_out)
# shape output to be (batch_size*seq_length, hidden_dim)
r_out = r_out.view(-1, self.hidden_dim)
print(f'\nThe shape of the RNN output after the format is: {r_out.shape}')
print('And the formatted output is:')
print(r_out)
# get final output
output = self.fc(r_out)
print(f'\nThe shape of the FC output is: {output.shape}')
print('And the FC output is:')
print(output)
return output, hidden
As a check that your model is working as expected, test out how it responds to input data.
# Creating an instance of the model
test_rnn = RNN_debug(input_size=1, output_size=1, hidden_dim=5, n_layers=3)
# generate evenly spaced, test data points
seq_length = 8
time_steps = np.linspace(0, np.pi, seq_length)
data = np.sin(time_steps)
data.resize((seq_length, 1)) ## adding an extra dimension so the data shape is (20 x 1)
test_input = torch.Tensor(data).unsqueeze(0) # give it a batch_size of 1 as first dimension => shape = (1 x 20 x 1)
print('The input size before feeding the input to the network: ', test_input.size())
print('And the input is:')
print(f'{test_input}\n')
# test out rnn sizes
test_out, test_h = test_rnn(test_input, None) ## using the forward method without writing .forward
The input size before feeding the input to the network: torch.Size([1, 8, 1])
And the input is:
tensor([[[0.0000e+00],
[4.3388e-01],
[7.8183e-01],
[9.7493e-01],
[9.7493e-01],
[7.8183e-01],
[4.3388e-01],
[1.2246e-16]]])
The shape of the RNN hidden state is: torch.Size([3, 1, 5])
And the hidden state is:
tensor([[[ 0.7991, 0.1572, 0.5992, 0.0689, -0.4188]],
[[ 0.4612, 0.4897, -0.8325, -0.1426, -0.4140]],
[[-0.1722, -0.0886, 0.3337, 0.2658, 0.3435]]],
grad_fn=<StackBackward>)
The shape of the RNN output before any format is: torch.Size([1, 8, 5])
And the untouched output is:
tensor([[[-0.2885, 0.0350, 0.1393, 0.0172, 0.2007],
[-0.2364, -0.0781, 0.2343, 0.1595, 0.2335],
[-0.2276, -0.1336, 0.3397, 0.2171, 0.2824],
[-0.2370, -0.1931, 0.3488, 0.2645, 0.3053],
[-0.2105, -0.2029, 0.3860, 0.2730, 0.3053],
[-0.2123, -0.1916, 0.3761, 0.2921, 0.3254],
[-0.1832, -0.1511, 0.3722, 0.2777, 0.3331],
[-0.1722, -0.0886, 0.3337, 0.2658, 0.3435]]],
grad_fn=<TransposeBackward1>)
The shape of the RNN output after the format is: torch.Size([8, 5])
And the formatted output is:
tensor([[-0.2885, 0.0350, 0.1393, 0.0172, 0.2007],
[-0.2364, -0.0781, 0.2343, 0.1595, 0.2335],
[-0.2276, -0.1336, 0.3397, 0.2171, 0.2824],
[-0.2370, -0.1931, 0.3488, 0.2645, 0.3053],
[-0.2105, -0.2029, 0.3860, 0.2730, 0.3053],
[-0.2123, -0.1916, 0.3761, 0.2921, 0.3254],
[-0.1832, -0.1511, 0.3722, 0.2777, 0.3331],
[-0.1722, -0.0886, 0.3337, 0.2658, 0.3435]], grad_fn=<ViewBackward>)
The shape of the FC output is: torch.Size([8, 1])
And the FC output is:
tensor([[-0.1420],
[-0.1764],
[-0.1857],
[-0.2093],
[-0.2006],
[-0.2046],
[-0.1819],
[-0.1668]], grad_fn=<AddmmBackward>)
Tensor Dimensions for the RNN¶sequence length: The number of elements in the sequence, for instance if we are feeding the network a vector of length = 20 then our sequence length is 20input_size: The number of features in our data, if the data is fed to the network as one element "feautre" per time step, then the input_size = 1, if we are feeding the network 2 elements per time step "let's say one element is the stock price in Canada and the other element is the CAD to USD value at that time", then the input_size = 2 as we will be feeding the network 2 features per time stepbatch_first: A boolean of whether to include the batch_size as the first dimension or not, if we set it to True and each batch has 1 input sequence then the batch_size = 1At each time step:
hidden_dimn_layers which is the number of layers of the RNN we have specifiedbatch size X sequence length X hidden state vector lengthnumber of RNN layers X batch size X hidden state vector lengthsequence length X output sizeNext, we'll instantiate an RNN with some specified hyperparameters. Then train it over a series of steps, and see how it performs.
# decide on hyperparameters
seq_length=18
input_size=1
output_size=1
hidden_dim=32
n_layers=3
# instantiate an RNN
rnn = RNN(input_size, output_size, hidden_dim, n_layers)
print(rnn)
RNN( (rnn): RNN(1, 32, num_layers=3, batch_first=True) (fc): Linear(in_features=32, out_features=1, bias=True) )
This is a regression problem: can we train an RNN to accurately predict the next data point, given a current data point?
- The data points are coordinate values, so to compare a predicted and ground_truth point, we'll use a regression loss: the mean squared error.
- It's typical to use an Adam optimizer for recurrent models.
# MSE loss and Adam optimizer with a learning rate of 0.01
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(rnn.parameters(), lr=0.01)
This function takes in an rnn, a number of steps to train for, and returns a trained rnn. This function is also responsible for displaying the loss and the predictions, every so often.
Pay close attention to the hidden state, here:
# train the RNN
def train(rnn, n_steps, print_every, GPU=False):
"""this function takes the following inputs:
rnn: the model
n_steps: the number of training epochs
print_every: an integer to print every certain number of steps
GPU: a boolean to either use GPU or CPU in training
"""
if GPU:
device = 'cuda'
else:
device='cpu'
# initialize the hidden state
hidden = None
for batch_i, step in enumerate(range(n_steps)):
# defining the training data
time_steps_deg = np.linspace(step*180, (step+1)*180, seq_length + 1)
time_steps = time_steps_deg * np.pi/180
data = np.sin(time_steps)
data.resize((seq_length + 1, 1)) # input_size=1
x = data[:-1]
y = data[1:]
# convert data into Tensors
x_tensor = torch.Tensor(x).unsqueeze(0) # unsqueeze gives a 1, batch_size dimension
y_tensor = torch.Tensor(y)
## moving the model and the data to the device (either cpu or gpu)
rnn.to(device)
x_tensor = x_tensor.to(device)
y_tensor = y_tensor.to(device)
## the initial hidden state = None which can't be moved to the GPU
## so using this try except block to avoid raising an error
try:
hidden = hidden.to(device)
except:
pass
# outputs from the rnn
prediction, hidden = rnn(x_tensor, hidden)
## Representing Memory ##
# make a new variable for hidden and detach the hidden state from its history
# this way, we don't backpropagate through the entire history
# hidden = hidden.data
hidden.detach_()
# zero gradients
optimizer.zero_grad()
# calculate the loss
loss = criterion(prediction, y_tensor)
# perform backprop and update weights
loss.backward()
optimizer.step()
# display loss and predictions
if batch_i%print_every == 0:
print('Loss: ', loss.item())
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=time_steps_deg[1:], y=x.squeeze(), mode='markers',
name='input x', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=time_steps_deg[1:], y=prediction.cpu().data.numpy().flatten(),
mode='markers', name='target y', marker_color='blue')
)
fig.update_layout(autosize=False, width=600, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
return rnn
# train the rnn and monitor results
n_steps = 150
print_every = 25
trained_rnn = train(rnn, n_steps, print_every, GPU=True)
Loss: 0.4605722427368164
Loss: 0.014374567195773125
Loss: 0.004883641377091408
Loss: 0.001172502525150776
Loss: 0.000737941765692085
Loss: 0.00043186324182897806
After training the model for 150 steps we can see the model predicted values "points with the blue color" are of the same shape as the input sequences "points with the red color" but with just 1 time step difference. Now, it's time to test the model on a sequence of range = 360 degrees and with much more points to make sure some of these points never got seen by the model before
test_steps = np.linspace(5*180, 7*180, 65)
test_seq = np.sin(test_steps * np.pi/180)
fig = go.Figure(
go.Scatter(
x=test_steps, y=test_seq, mode='markers',
name='test sequence', marker_color='red'))
fig.update_layout(autosize=False, width=800, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"),
title='The shape of the input sequence')
fig.show()
## Moving the model to the CPU
trained_rnn.cpu()
## Formatting the test sequence to be fed to the model
test_seq_formatted = torch.Tensor(test_seq).unsqueeze(1).unsqueeze(0)
## Feeding the sequence to the model
test_outputs, current_h = trained_rnn(test_seq_formatted, None)
## Formatting the model outputs
test_outputs = test_outputs.detach().numpy().flatten()
## Plotting the predicted values against the input sequence
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=test_steps, y=test_seq, mode='markers',
name='test sequence', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=test_steps, y=test_outputs, mode='markers',
name='predicted sequence', marker_color='blue')
)
fig.update_layout(autosize=False, width=800, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
We can see that, the first few predictions in the sequence are bad compared to the rest of the sequence. The reason for this is when the model starts seeing the data, it has no memory "we passed an initial hidden state = None", and the model builds its memory more and more each time it sees a new data point of the sequence data points. To avoid this issue we use what we can call Priming, the aim of Priming is to build up the model's memory before testing on a new sequence. We will see more about Priming in the upcoming notebook about LSTM cells.
## Creating a prime sequence
prime_steps = np.linspace(4*180, 5*180-5.625, 32)
prime_seq = np.sin(prime_steps * np.pi/180)
## Building up a memory for the model
_, current_h = trained_rnn(torch.Tensor(prime_seq).unsqueeze(1).unsqueeze(0), None)
## Feeding the test sequence to the model after building up the memory along with this memory
test_outputs, current_h = trained_rnn(test_seq_formatted, current_h)
## Formatting the model outputs
test_outputs = test_outputs.detach().numpy().flatten()
## Plotting the predicted values against the input sequence
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=test_steps, y=test_seq, mode='markers',
name='test sequence', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=test_steps, y=test_outputs, mode='markers',
name='predicted sequence', marker_color='blue')
)
fig.update_layout(autosize=False, width=800, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
We can clearly see that, after building up the model's memory, the predictions are much better for the early time steps. However, building the model's memory depends on the length of the Prime Sequence and the distance between the points of the sequence. In my case here, I set the distance between the points of the Prime Sequence just like the distance between the points of the Test Sequence.
The question that you might have is that, was it really necessary to use RNNs in this scenario?!, can't we just use a Vanilla Feed Forward network?
To answer this question we need to take a look at the graph below, we will see that both the green points have values = 0, however, the points following the left green point are of positive values and the points following the right green point are of negative values. Without the model's memory, the model won't be able to differentiate between these 2 points assuming we are feeding the model one data point per time.
angle_180 = np.linspace(0, 360, 65)
sin_angle = np.sin(angle_180 * np.pi/180)
## Plotting the predicted values against the input sequence
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=angle_180, y=sin_angle, mode='markers',
name='sine wave', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=[0, 180], y=[0, 0], mode='markers',
name='points of interest', marker_color='green')
)
fig.update_layout(autosize=False, width=800, height=400,
xaxis=dict(title="Angle in degrees", showgrid=False),
yaxis=dict(title="Sine of the angle", showgrid=False))
fig.show()
Let's check the model's behavior if we build a Vanilla Feed Forward model.
## Creating a Vanilla neural network
vanilla_model = nn.Sequential(nn.Linear(1, 128),
nn.ReLU(),
nn.Linear(128,1))
## Moving the model to the GPU before creating the optimizer
vanilla_model.cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(vanilla_model.parameters(), lr=0.01)
# train the RNN
def train(model, n_steps, print_every, GPU=False):
"""this function takes the following inputs:
rnn: the model
n_steps: the number of training epochs
print_every: an integer to print every certain number of steps
GPU: a boolean to either use GPU or CPU in training
"""
if GPU:
device = 'cuda'
else:
device='cpu'
for batch_i, step in enumerate(range(n_steps)):
# defining the training data
time_steps_deg = np.linspace(step*180, (step+1)*180, seq_length + 1)
time_steps = time_steps_deg * np.pi/180
data = np.sin(time_steps)
data.resize((seq_length + 1, 1))
x = data[:-1]
y = data[1:]
# convert data into Tensors
x_tensor = torch.Tensor(x)
y_tensor = torch.Tensor(y)
## moving the model and the data to the device (either cpu or gpu)
model.to(device)
x_tensor = x_tensor.to(device)
y_tensor = y_tensor.to(device)
# outputs from the model
prediction = model(x_tensor)
# zero gradients
optimizer.zero_grad()
# calculate the loss
loss = criterion(prediction, y_tensor)
# perform backprop and update weights
loss.backward()
optimizer.step()
# display loss and predictions
if batch_i%print_every == 0:
print('Loss: ', loss.item())
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=time_steps_deg[1:], y=x.squeeze(), mode='markers',
name='input x', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=time_steps_deg[1:], y=prediction.cpu().detach().numpy().flatten(),
mode='markers', name='target y', marker_color='blue')
)
fig.update_layout(autosize=False, width=600, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
return model
# train the Feed Forward model and monitor results
n_steps = 150
print_every = 25
van_trained_model = train(vanilla_model, n_steps, print_every, GPU=True)
Loss: 0.9516924023628235
Loss: 0.03770126402378082
Loss: 0.01666671223938465
Loss: 0.01527765579521656
Loss: 0.015252206474542618
Loss: 0.015046871267259121
## Testing the Vanilla Feed Forward model
## Moving the model to the CPU
vanilla_model.cpu()
## Feeding the test sequence to the model after building up the memory along with this memory
test_outputs = vanilla_model(test_seq_formatted)
## Formatting the model outputs
test_outputs = test_outputs.detach().numpy().flatten()
## Plotting the predicted values against the input sequence
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=test_steps, y=test_seq, mode='markers',
name='test sequence', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=test_steps, y=test_outputs, mode='markers',
name='predicted sequence', marker_color='blue')
)
fig.update_layout(autosize=False, width=800, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
We can see the Vanilla Feed Forward model is not predicting 1 time step in the future. It's averaging the positive and negative next time step predictions which leads to predicting the current time step.
Another approach is to give the model the ability to link between the value of the current time step and the other surrounding time steps without giving the model a memory. This can be achieved using a 1D Convolutional layer. The architecture of the model is taken from this blog post
## Creating a CNN model
class CNN_model(nn.Module):
def __init__(self):
super().__init__()
## Convolutional layers
self.conv1 = nn.Conv1d(in_channels=1,
out_channels=100,
kernel_size=3,
stride=1,
padding=1)
self.conv2 = nn.Conv1d(in_channels=100,
out_channels=100,
kernel_size=3,
stride=1,
padding=1)
## Fully connected layers
self.fc1 = nn.Linear(100, 100)
self.fc2 = nn.Linear(100, 1)
def forward(self, x):
## Reshape the tensor to the proper shape
x = x.reshape(1, 1, -1)
## Conv feed forward
x = nn.LeakyReLU(0.03)(self.conv1(x))
x = nn.LeakyReLU(0.03)(self.conv2(x))
## Reshaping the tensors
x = x.squeeze().T
## Dense layers feed forward
x = nn.LeakyReLU(0.03)(self.fc1(x))
x = nn.LeakyReLU(0.03)(self.fc2(x))
return x
cnn_model = CNN_model()
print(cnn_model)
CNN_model( (conv1): Conv1d(1, 100, kernel_size=(3,), stride=(1,), padding=(1,)) (conv2): Conv1d(100, 100, kernel_size=(3,), stride=(1,), padding=(1,)) (fc1): Linear(in_features=100, out_features=100, bias=True) (fc2): Linear(in_features=100, out_features=1, bias=True) )
## Create an instance of the CNN model
cnn_model = CNN_model()
## Moving the model to the GPU
cnn_model.cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(cnn_model.parameters(), lr=0.01)
# train the CNN model and monitor results
n_steps = 150
print_every = 25
cnn_trained_model = train(cnn_model, n_steps, print_every, GPU=True)
Loss: 0.44369369745254517
Loss: 0.023980261757969856
Loss: 0.007767544128000736
Loss: 0.0005575448158197105
Loss: 7.490362622775137e-05
Loss: 7.572841423097998e-05
## Moving the model to the CPU
cnn_model.cpu()
## Feeding the test sequence to the model after building up the memory along with this memory
test_outputs = cnn_model(test_seq_formatted)
## Formatting the model outputs
test_outputs = test_outputs.detach().numpy().flatten()
## Plotting the predicted values against the input sequence
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=test_steps, y=test_seq, mode='markers',
name='test sequence', marker_color='red')
)
fig.add_trace(
go.Scatter(
x=test_steps, y=test_outputs, mode='markers',
name='predicted sequence', marker_color='blue')
)
fig.update_layout(autosize=False, width=800, height=400,
xaxis=dict(title="Angle in degrees"),
yaxis=dict(title="Sine of the angle"))
fig.show()
We can see the CNN model is doing a good job just like the RNN model because the model is taking into consideration the values of the surrounding points. That's why CNN models are a very good candidate in scenarios that might require RNN models without the complications of RNNs.